Skip to content

Conversation

@aiborodin
Copy link
Contributor

@aiborodin aiborodin commented Sep 17, 2025

Addresses the following issue: #14090.

Fix the issue by aggregating WriteResult objects by table and branch (aka TableKey), which would emit a single committable per checkpoint. It requires serialising the aggregated WriteResult and saving it in a Flink checkpoint instead of a temporary manifest file, because, according to the Iceberg spec, a single manifest must contain files with only one partition spec, while we may aggregate write results for potentially multiple partition specs.

@aiborodin aiborodin force-pushed the fix-commit-idempotence-of-dynamic-sink branch from 83cc431 to 5e36867 Compare September 17, 2025 10:21
@aiborodin
Copy link
Contributor Author

@mxm @pvary I would appreciate your review.

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @aiborodin! I'm going to have to digest this a bit. Seems like we are addressing two issues here:

  1. Fixing a potential issue in DynamicCommitter with processing multiple WriteResult for a given table.
  2. Refactoring the commit logic to produce one snapshot per spec change.

It would make things easier to review if we fixed (1) and then (2), perhaps in separate commits. I need to dig my head a bit deeper into the changes to understand them properly.

Comment on lines +141 to +170
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("dataFiles", dataFiles)
.add("deleteFiles", deleteFiles)
.add("referencedDataFiles", referencedDataFiles)
.add("rewrittenDeleteFiles", rewrittenDeleteFiles)
.toString();
}

@Override
public boolean equals(Object other) {
if (other == null || getClass() != other.getClass()) {
return false;
}
WriteResult that = (WriteResult) other;
return Objects.deepEquals(dataFiles, that.dataFiles)
&& Objects.deepEquals(deleteFiles, that.deleteFiles)
&& Objects.deepEquals(referencedDataFiles, that.referencedDataFiles)
&& Objects.deepEquals(rewrittenDeleteFiles, that.rewrittenDeleteFiles);
}

@Override
public int hashCode() {
return Objects.hash(
Arrays.hashCode(dataFiles),
Arrays.hashCode(deleteFiles),
Arrays.hashCode(referencedDataFiles),
Arrays.hashCode(rewrittenDeleteFiles));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we strictly need equals / hashCode for the implementation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm asking because this is in core and potentially affects other engines.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm asking because this is in core and potentially affects other engines.

You're right, it's not strictly necessary for the implementation. However, there have been a few instances in this codebase of decomposing and manually checking subsets of WriteResult or DataFile/DeleteFile due to a lack of these methods, so they simplify DynamicWriteResultAggregator tests, as well as DynamicWriteResult and WriteResult serialisation tests.

I don't see any harm in adding them. Why would other engines rely on no implementations of equals / hashCode? In this codebase, apart from Flink, it's only used in Spark in SparkPositionDeltaWrite as a simple data class.

Copy link
Contributor

@pvary pvary Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is very easy to rely on equals/hashCode inadvertently. Consider HsshSet, HashMap, etc.

Equals is typically used many places, but before this change it defaults to instance equality which is very different

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's clear. Do you have any specific examples where you think this could be problematic? I can't think of any use case where someone would use BaseFile implementations as keys in a HashMap or put them in a HashSet without meaningful equals/hashCode implementations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary I am happy to extract the equals/hashCode into a separate PR and use manual decomposition of properties of WriteResult and Data/DeleteFiles in tests in this PR if you prefer. Although I still believe it should be totally fine to add these methods to simplify the code.

@pvary
Copy link
Contributor

pvary commented Sep 17, 2025

There is no guarantee, that the WriteResult serialization would remain compatible between Iceberg releases. OTOH the manifest serialization should be backwards compatible. This is the reason why manifest is used to store the temporary data which is part of a checkpoint. I don't think we would like to change this.

Also writing temporary files is costly. We need to check the cost of writing more manifest files, if we decide to go down that way. We might be better off with writing more sophisticated commit logic, and generating better commit keys (maybe add specId too)

@aiborodin aiborodin changed the title DD-7055 Fix commit idempotence of DynamicIcebergSink Fix commit idempotence of DynamicIcebergSink Sep 18, 2025
@aiborodin aiborodin force-pushed the fix-commit-idempotence-of-dynamic-sink branch from 5e36867 to dca52ce Compare September 18, 2025 02:14
@aiborodin
Copy link
Contributor Author

  1. Fixing a potential issue in DynamicCommitter with processing multiple WriteResult for a given table.
  2. Refactoring the commit logic to produce one snapshot per spec change.

It would make things easier to review if we fixed (1) and then (2), perhaps in separate commits. I need to dig my head a bit deeper into the changes to understand them properly.

@mxm How do you envision splitting these two points? The root problem is DynamicWriteResultAggregator producing multiple commit requests per table, branch, and checkpointId triplet. The DynamicCommitter is agnostic to this and commits upstream requests; it doesn't have any special handling for partition spec changes. I cleaned up DynamicCommitter to reflect a single commit expectation, which feels like it should be part of this change.

@aiborodin
Copy link
Contributor Author

@pvary, that's a good point. I think we can achieve the same backwards compatibility guarantees without writing temporary manifest files. And instead, write data and delete files directly into a checkpoint as Avro data. Let me experiment with this and try to get a version with Avro serialisation working, which would give us backwards compatibility.

@pvary
Copy link
Contributor

pvary commented Sep 18, 2025

I think we can achieve the same backwards compatibility guarantees without writing temporary manifest files. And instead, write data and delete files directly into a checkpoint as Avro data. Let me experiment with this and try to get a version with Avro serialisation working, which would give us backwards compatibility.

There is an additional thing we should consider. What happens when the attributes of the Data/Delete files are changing. With the current way - storing the manifest -, we are automatically picking up changes done in the core. If we implement our own serializer/deserializer for them, we need to keep an eye out for these changes, and implement the corresponding changes in our serializers every time.

@mxm
Copy link
Contributor

mxm commented Sep 23, 2025

@mxm How do you envision splitting these two points? The root problem is DynamicWriteResultAggregator producing multiple commit requests per table, branch, and checkpointId triplet. The DynamicCommitter is agnostic to this and commits upstream requests; it doesn't have any special handling for partition spec changes. I cleaned up DynamicCommitter to reflect a single commit expectation, which feels like it should be part of this change.

I've left some thoughts in #14090. I also have an implementation which I'll share. We can discuss then which is the best approach to proceed.

Basically, the idea is to use as few snapshots as possible. We can combine append-only WriteResults into a single snapshot. Whenever delete files are present, we need multiple snapshots. To make this fault tolerant, we need to store an index into the list of write results and persist it as part of the snapshot summary, similarly to the Flink checkpoint id. We can then skip any previously applied WriteResults on recovery.

As for multiple partition spec per snapshot, I couldn't find that this is not permitted in Iceberg.

@aiborodin
Copy link
Contributor Author

aiborodin commented Sep 24, 2025

Whenever delete files are present, we need multiple snapshots.

@mxm Could you please clarify why we need multiple snapshots per checkpoint when delete files are present? Wouldn't Iceberg core already handle deletions in the aggregated WriteResult from multiple parallel writers per checkpoint?
The current implementation of IcebergSink commits both deletes and appends in a single snapshot using RowDelta.

@mxm
Copy link
Contributor

mxm commented Sep 24, 2025

@aiborodin Regular data files and delete files can both be part of the same snapshot (actually, have to be for upsert semantics). However, we have to create a table snapshot before we process WriteResults with delete files. The reason is that data and delete files are not ordered, but deletes often require an order to be applied correctly.

For example: WriteResult w1 (append-only) and WriteResult w2 (delete + append).

If we would combine w1 and w2 into a single snapshot, Iceberg will first apply the delete files and delete the relevant rows, then apply the appends from both WriteResults. If we merged both WriteResults and created a single table snapshot, any deletes matching rows appended in w1 would not get deleted. Deletes are always applied before appends. There is no order between data files and delete files, which honestly feels like a limitation of Iceberg.

The semantics are totally different when we first create a table snapshot for w1, because any deletes by w2 would be applied on top of this snapshot and before appending data via w2. That's the reason why I think we can combine WriteResults into a single snapshot, as long as we don't see delete files. As soon as we discover a delete file, we need to create a table snapshot with the so-far aggregated WriteResults.

@aiborodin
Copy link
Contributor Author

The semantics are totally different when we first create a table snapshot for w1, because any deletes by w2 would be applied on top of this snapshot and before appending data via w2

@mxm, so are you saying we shouldn't aggregate WriteResults with delete files? If so, it will create too many commits/snapshots with high writer parallelisms, as each writer would emit a separate WriteResult.

@mxm
Copy link
Contributor

mxm commented Sep 24, 2025

Yes. That's why the code in the main branch is designed that way. IMHO we can only aggregate WriteResults without delete files (append-only), unless we change Iceberg core to enforce an order on how data / delete files are applied. I'd be curious to hear @pvary's thoughts on this.

@pvary
Copy link
Contributor

pvary commented Sep 24, 2025

IMHO we can only aggregate WriteResults without delete files (append-only), unless we change Iceberg core to enforce an order on how data / delete files are applied. I'd be curious to hear @pvary's thoughts on this.

We can only commit files for multiple checkpoints when there are only appends/data files in the checkpoint.

For Iceberg everything which is committed in a single transaction happened at the same time. So if we have equality deletes for both checkpoints, then they will be applied together.

Consider this scenario:

  • R1 insert - new data file (DF1) with R1 (PK1)
  • C1 commit
  • R1' update - new equality delete file with PK1 (EQ1), new data file (DF2) with R1'
  • C2 commit
  • R1'' update - new equality delete file with PK1 (EQ2), new data file (DF3) with R1''
  • C3 commit

If we merge C2 and C3 commit, then we add EQ1 and EQ2 in the same commit, and they will be applied only for DF1. They will not be applied to DF2 or DF3 as they are added in the same commit, and as a result we will have a duplication in our table. Both R1' and R1'' will be present after C3

@aiborodin
Copy link
Contributor Author

They will not be applied to DF2 or DF3 as they are added in the same commit, and as a result we will have a duplication in our table. Both R1' and R1'' will be present after C3

Thank you for the detailed clarification on this, @pvary and @mxm. My change does not aggregate WriteResults across checkpoints. Each checkpoint would create a separate snapshot with its own delete and data files. The DynamicCommitter code in this change evidences this:

  private void commitDeltaTxn(
      Table table,
      String branch,
      NavigableMap<Long, Committer.CommitRequest<DynamicCommittable>> pendingRequests,
      CommitSummary summary,
      String newFlinkJobId,
      String operatorId) {
    for (Map.Entry<Long, CommitRequest<DynamicCommittable>> e : pendingRequests.entrySet()) {
      // We don't commit the merged result into a single transaction because for the sequential
      // transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied
      // to data files from txn1. Committing the merged one will lead to the incorrect delete
      // semantic.
      WriteResult result = e.getValue().getCommittable().writeResult();

The current change would only aggregate WriteResults across multiple parallel writers per (table, branch, checkpoint) triplet, similar to the current non-dynamic IcebergSink.

Given the above, @mxm, do you still think we need to commit multiple WriteResults separately for (table, branch, checkpoint) triplet and implement the index-based solution to guarantee idempotency as you mentioned here: #14090 (comment)? If so, could you please explain why this solution is necessary?

@aiborodin aiborodin force-pushed the fix-commit-idempotence-of-dynamic-sink branch from dca52ce to 2ecb99c Compare September 25, 2025 09:08
@aiborodin
Copy link
Contributor Author

@pvary @mxm I added the ContentFileAvroEncoder, which serialises content files using internal Iceberg Avro serialisation classes to provide backwards compatibility. This serialisation is the same as in ManifestWriter (uses InternalData), but strips out all manifest-related properties, which we don't need for content file checkpoint persistence.
There's a similarly inspired JSON-based serialiser called ContentFileParser, which is also used in Flink, but I decided to still use Avro to keep the checkpoint size minimal.

@mxm
Copy link
Contributor

mxm commented Sep 25, 2025

Hey folks! I've implemented the approached outlined in 1 and 2:

@aiborodin I just saw you also pushed an update. I'll take a look.

@mxm
Copy link
Contributor

mxm commented Sep 25, 2025

We can only commit files for multiple checkpoints when there are only appends/data files in the checkpoint.

@pvary I wasn't suggesting that we combine WriteResults from multiple Flink checkpoints. I'm suggesting to combine the append-only WriteResults in each Flink checkpoint. Currently, every WriteResult is processed separately, which creates a lot of table snapshots.

Given the above, @mxm, do you still think we need to commit multiple WriteResults separately for (table, branch, checkpoint) triplet and implement the index-based solution to guarantee idempotency as you mentioned here: #14090 (comment)? If so, could you please explain why this solution is necessary?

@aiborodin Each table / branch pair requires a separate table snapshot. While we could combine multiple Flink checkpoints during recovery, I don't think there is much benefit from doing that. Apart from recovery, every checkpoint would normally be processed independently. We wouldn't gain much from optimizing the snapshots by combining commit request from multiple checkpoints.

@aiborodin
Copy link
Contributor Author

I'm suggesting to combine the append-only WriteResults in each Flink checkpoint.

@mxm Just combining the append-only WriteResults in each Flink checkpoint would not resolve the issue for WriteResults with delete files (see my comment in #14182 (comment)). We should combine both appends and deletes within the same checkpoint (implemented in this change), which is valid for equality delete semantics because records with the same equality fields would always be routed to the same writer, storing all unique equality delete keys in the same delete file.

While we could combine multiple Flink checkpoints during recovery, I don't think there is much benefit from doing that. Apart from recovery, every checkpoint would normally be processed independently. We wouldn't gain much from optimizing the snapshots by combining commit request from multiple checkpoints.

My suggestion is to combine both appends and deletes for the same checkpoint, which I implemented in this change.

@mxm
Copy link
Contributor

mxm commented Sep 26, 2025

@mxm Just combining the append-only WriteResults in each Flink checkpoint would not resolve the issue for WriteResults with delete files (see my comment in #14182 (comment)). We should combine both appends and deletes within the same checkpoint (implemented in this change), which is valid for equality delete semantics because records with the same equality fields would always be routed to the same writer, storing all unique equality delete keys in the same delete file.

I've responded in #14182 (comment). In a nutshell, my code was doing exactly what you suggest here, committing all appends / deletes from a checkpoint, but it would additionally add WriteResults from other checkpoint if those did not contain delete files. I noticed that the code was too hard to understand, so I removed this optimization.

My suggestion is to combine both appends and deletes for the same checkpoint, which I implemented in this change.

IMHO the solution you implemented goes beyond the scope of fixing the issue. I'm open to refactoring the aggregator / committer code, but I think we should add a set of minimal changes to fix the correctness issue, along side with increased test coverage. This is what I think #14182 achieves.

@aiborodin
Copy link
Contributor Author

aiborodin commented Sep 29, 2025

IMHO the solution you implemented goes beyond the scope of fixing the issue. I'm open to refactoring the aggregator / committer code, but I think we should add a set of minimal changes to fix the correctness issue, along side with increased test coverage.

@mxm I do understand the simplicity of #14182, and I am okay with merging it as a first step. Although I think we should follow up and merge this PR as well to draw a clear boundary between aggregating temporary WriteResults (DynamicWriteResultAggregator) and committing them to a table (DynamicCommitter). It would result in clean and maintainable code in the long term with a clear separation of concerns.

@aiborodin aiborodin force-pushed the fix-commit-idempotence-of-dynamic-sink branch from 2ecb99c to 67bd9d1 Compare September 29, 2025 07:15
@aiborodin
Copy link
Contributor Author

I added a regression test to verify there's only one commit per table/branch/checkpoint in the dynamic sink.
@pvary @mxm, Looking forward to getting this change in and closing this story.

DynamicWriteResultAggregator currently produces multiple committables
per table/branch/checkpoint triplet because it aggregates write results
by WriteTarget, which is unique per schemaId, specId, and equality
fields. It violates the idempotence contract of the DynamicCommitter, as
it relies on one commit request per triplet to identify and skip already
committed requests during recovery.

Fix this by aggregating WriteResult objects by table and branch (aka
TableKey), which would emit a single committable per checkpoint.
It requires serialising the aggregated WriteResult and saving it in a
Flink checkpoint instead of a temporary manifest file, because,
according to the Iceberg spec, a single manifest must contain files with
only one partition spec, while we may aggregate write results for
potentially multiple partition specs.

Change-Id: Id5cf832af4d6f64bd619635c937e216d84df4f5b
@aiborodin aiborodin force-pushed the fix-commit-idempotence-of-dynamic-sink branch from 67bd9d1 to 3294661 Compare September 29, 2025 08:07
@pvary
Copy link
Contributor

pvary commented Sep 29, 2025

@aiborodin: We are trying to do too many things parallel in this PR and it makes it hard to review.

Here are the separate features I see here:

  1. Fixing the correctness issue
  2. Adding convenience equals and hashcode implementations for BaseFile and WriteResult
  3. Moving the aggregation of the commits from the committer to the Aggregator
  4. Changing the state serialization mode for the Committable

Here is how I see the changes 1-by-1:

  1. Fixing is a must
  2. I'm afraid that the community will push back on this. You can separate out this to another PR, and see what other people think about it. I still feel that this could be a behavioral change in the core part of Iceberg, which we should not do, but I can be convinced otherwise
  3. This is a very good idea, and I fully support this
  4. I think the current one is not ideal, and I'm happy to talk about it, but I suggest that we do it in a separate PR

Thank you for reporting the issue and working on the solution!

@aiborodin
Copy link
Contributor Author

@pvary thanks for summarising and highlighting the features. I will work on extracting them into separate PRs.

@aiborodin
Copy link
Contributor Author

@pvary I raised #14312, which addresses your point 3. Could you please take a look?

It moves commit aggregation to DynamicWriteResultAggregator and uses the current Iceberg manifest reader and writer for serialisation. As a next step, I want to improve the serialisation mechanism and encapsulate it completely into DynamicCommittableSerializer.

@github-actions
Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Nov 13, 2025
@github-actions
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Nov 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants